---
slug: multi-round-local-merge
title: "Multi-Round Lazy Start Merge"
authors: [duanmeng, xiaoxmeng, pedroerp]
tags: [tech-blog, spill, operator]
---

## Background

Efficiently merging sorted data partitions at scale is crucial for a variety of training data preparation
workloads, especially for Generative Recommenders (GRs) a new paradigm introduced in the paper
*[Actions Speak Louder than Words: Trillion-Parameter Sequential Transducers for Generative Recommendations](https://arxiv.org/pdf/2402.17152)*.
A key requirement is to merge training data across partitions—for example, merging hourly partitions into
daily ones—while ensuring that all rows sharing the same primary key are stored consecutively. Training data is
typically partitioned and bucketed by primary key, with rows sharing the same key
stored consecutively, so merging across partitions essentially becomes a [multi-way merge problem](https://en.wikipedia.org/wiki/K-way_merge_algorithm#).

Normally, [Apache Spark](https://spark.apache.org/) can be used for this sort-merge requirement — for example, via `CLUSTER BY`.
However, training datasets for a single job can often reach the PB scale, which in turn generates shuffle data at PB scale.
Although we typically apply bucketing and ordering by key when preparing training data in production,
Spark can eliminate the shuffle when merging training data from multiple hourly partitions.
However, each Spark task can only read the files planned from various partitions within a split
sequentially, placing them into the sorter and spilling as needed. Only after all files have been read
does Spark perform a sort-merge of the spilled files. This process produces a large number of small
spill files, which further degrades efficiency.

Moreover, Spark’s spill is row-based with a low compression ratio, resulting in approximately 4 times
amplification compared to the original columnar training data in the data lake. These factors
significantly degrade task stability and performance. Velox has a `LocalMerge` operator that can be
introduced into Apache Spark via [Gluten](https://gluten.apache.org/) or [PySpark on Velox](https://www.youtube.com/watch?v=oq5M2861WaQ).

*Note: To keep the focus on merging, the remainder of this article also assumes that each partition’s
training data is already sorted by primary key—a common setup in training data pipelines.*


## LocalMerge Operator

The `LocalMerge` operator consolidates its sources’ outputs into a single, sorted stream of rows.
It runs single-threaded, while its upstream sources may run multi-threaded within the same task,
producing multiple sorted inputs concurrently. For example, when merging 24 hourly partitions into
a single daily partition (as shown in the figure below), the merge plan fragment is split into two pipelines:

- Pipeline 0: contains two operators, `TableScan` and `CallbackSink`. 24 drivers are instantiated to scan the 24 hourly partitions.
- Pipeline 1: contains only a single operator, `LocalMerge`, with one driver responsible for performing the sort merge.

A `CallbackSink` operator is installed at the end of each driver in Pipeline 0. It pushes the `TableScan`
operator’s output vectors into the queues backing the merge streams. Inside `LocalMerge`, a `TreeOfLosers`
performs a k-way merge over the 24 merge streams supplied by the Pipeline 0 drivers, producing a single,
globally sorted output stream.


<figure>
    <img src="/img/merge.png" height= "100%" width="100%"/>
</figure>

## Multi-Round Spill Merge

Although `LocalMerge` minimizes comparisons during merging, preserves row-ordering guarantees, and cleanly
isolates the single-threaded merge from the multi-threaded scan phase for predictable performance, it can
cause substantial memory pressure—particularly in training-data pipelines. In these workloads, extremely
wide tables are common, and even after column pruning, thousands of columns may remain.

Moreover, training data is typically stored in [PAX-style formats such as Parquet, ORC, or DRWF](https://www.vldb.org/pvldb/vol17/p148-zeng.pdf).
Using Parquet as an example, the reader often needs to keep at least one page per column in memory. As a result,
simply opening a Parquet file with thousands of columns can consume significant memory even before any
merging occurs. Wide schemas further amplify per-column metadata, dictionary pages, and decompression buffers,
inflating the overall footprint. In addition, the k-way merge must hold input vectors from multiple sources
concurrently, which drives peak memory usage even higher.

To cap memory usage and avoid OOM when merging a large number of partitions, we extend `LocalMerge` to
process fewer local sources at a time, leverage existing spill facilities to persist intermediate
results, and introduce lazy-start activation for merge inputs. Using the case of merging 24 hourly
partitions into a single daily partition, the process is organized into two phases:

**Phase 1**

1. Break the scan-and-merge into multiple rounds (e.g., 3 rounds).
2. In each round, lazily start a limited number of drivers (e.g., drivers 0–7, eight at a time).
3. The started drivers scan data and push it into the queues backing their respective merge streams.
4. Perform an in-memory k-way merge and spill the results, producing a spill-file group (one or more spill files per group).
5. After all inputs from drivers 0–7 are consumed and spilled, the drivers will be closed, and close the file streams opened by their `TableScan` operators, and release associated memory.
6. Repeat the above steps for the remaining rounds (drivers 8–15, then drivers 16–23), ensuring peak memory stays within budget.

**Phase 2**

1. Create a concatenated file stream for each spill-file group produced in Phase 1.
2. Schedule one async callback for each concatenated stream to prefetch and push data into a merge stream.
3. Merge the outputs of the three merge streams using a k-way merge (e.g., a loser-tree), and begin streaming the final, globally sorted results to downstream operators.
4. The output batch rows is limited adaptively by estimating row size from the merge streams which use the averaged row size from the first batch.

<figure>
    <img src="/img/spill.merge.png" height= "100%" width="100%"/>
</figure>

## How To Use

Set `local_merge_spill_enabled` to `true` to enable spilling for the `LocalMerge`
operator (it is `false` by default). Then, set `local_merge_max_num_merge_sources` to
control the number of merge sources per round according to your memory management strategy.

*Note: An executor must be configured for spilling, as it would schedule an asynchronous
callback for each concatenated stream to prefetch data and push it into the merge stream.*

## Future Work

The number of merge sources is adjusted dynamically based on available memory, rather than being
determined by the `local_merge_max_num_merge_sources` parameter. The process starts with a small
number of sources, such as 2, and incrementally increases this number for subsequent
rounds (e.g., to 4) as long as sufficient memory is available. The number of sources stops increasing
once it reaches a memory-constrained limit.

## Acknowledgements

Thanks to [Xiaoxuan Meng](https://www.linkedin.com/in/xiaoxuanmeng/) and [Pedro Pederia](https://www.linkedin.com/in/pedro-pedreira/) for their guidance, review, and brainstorming.
I also appreciate the excellent collaboration and work from my colleagues,
[Xiang Yao](https://www.linkedin.com/in/%E7%BF%94-%E5%A7%9A-1b513a359/),
[Gang Wang](https://github.com/zjuwangg),
and [Weixin Xu](https://www.linkedin.com/in/xu-weixin-75b06786/).
